<!DOCTYPE html>



  


<html class="theme-next mist use-motion" lang="zh-Hans">
<head>
  <meta charset="UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"/>
<meta name="theme-color" content="#222">



  
  
    
    
  <script src="/lib/pace/pace.min.js?v=1.0.2"></script>
  <link href="/lib/pace/pace-theme-minimal.min.css?v=1.0.2" rel="stylesheet">







<meta http-equiv="Cache-Control" content="no-transform" />
<meta http-equiv="Cache-Control" content="no-siteapp" />
















  
  
  <link href="/lib/fancybox/source/jquery.fancybox.css?v=2.1.5" rel="stylesheet" type="text/css" />







<link href="/lib/font-awesome/css/font-awesome.min.css?v=4.6.2" rel="stylesheet" type="text/css" />

<link href="/css/main.css?v=5.1.4" rel="stylesheet" type="text/css" />


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="16x16" href="/images/favicon-16x16-next.png?v=5.1.4">


  <link rel="mask-icon" href="/images/logo.svg?v=5.1.4" color="#222">





  <meta name="keywords" content="Hexo, NexT" />





  <link rel="alternate" href="/atom.xml" title="yampery" type="application/atom+xml" />






<meta name="description" content="本文介绍distcp的使用、注意事项，最后从源码上简要分析其工作原理和模式。 distcp使用 DistCp Version 2(分布式copy)是用于集群间/集群内的文件copy工具， 使用MapReduce实现分布式、错误处理、恢复和报告。distCp会根据目录文件生成map任务， 每一个任务会copy部分文件内容。   基本使用 最常使用的是集群间copy   1hadoop distcp">
<meta property="og:type" content="article">
<meta property="og:title" content="Hadoop集群间文件拷贝">
<meta property="og:url" content="https://yampery.gitee.io/2019/02/25/distcp/index.html">
<meta property="og:site_name" content="yampery">
<meta property="og:description" content="本文介绍distcp的使用、注意事项，最后从源码上简要分析其工作原理和模式。 distcp使用 DistCp Version 2(分布式copy)是用于集群间/集群内的文件copy工具， 使用MapReduce实现分布式、错误处理、恢复和报告。distCp会根据目录文件生成map任务， 每一个任务会copy部分文件内容。   基本使用 最常使用的是集群间copy   1hadoop distcp">
<meta property="og:locale" content="zh-Hans">
<meta property="og:image" content="https://i.imgur.com/EjuUxrL.png">
<meta property="og:image" content="https://i.imgur.com/38o3TKy.png">
<meta property="og:updated_time" content="2019-02-27T12:36:35.904Z">
<meta name="twitter:card" content="summary">
<meta name="twitter:title" content="Hadoop集群间文件拷贝">
<meta name="twitter:description" content="本文介绍distcp的使用、注意事项，最后从源码上简要分析其工作原理和模式。 distcp使用 DistCp Version 2(分布式copy)是用于集群间/集群内的文件copy工具， 使用MapReduce实现分布式、错误处理、恢复和报告。distCp会根据目录文件生成map任务， 每一个任务会copy部分文件内容。   基本使用 最常使用的是集群间copy   1hadoop distcp">
<meta name="twitter:image" content="https://i.imgur.com/EjuUxrL.png">



<script type="text/javascript" id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '/',
    scheme: 'Mist',
    version: '5.1.4',
    sidebar: {"position":"left","display":"post","offset":12,"b2t":false,"scrollpercent":false,"onmobile":false},
    fancybox: true,
    tabs: true,
    motion: {"enable":true,"async":false,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    duoshuo: {
      userId: '0',
      author: '博主'
    },
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>



  <link rel="canonical" href="https://yampery.gitee.io/2019/02/25/distcp/"/>





  <title>Hadoop集群间文件拷贝 | yampery</title>
  








  <link href='//fonts.googleapis.com/css?family=Kaushan+Script' rel='stylesheet' type='text/css'>
</head>

<body itemscope itemtype="http://schema.org/WebPage" lang="zh-Hans">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta ">
    

    <div class="custom-logo-site-title">
      <a href="/"  class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">yampery</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
      
        <p class="site-subtitle">天尽头，回眸望红尘</p>
      
  </div>

  <div class="site-nav-toggle">
    <button>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>

<nav class="site-nav">
  

  
    <ul id="menu" class="menu">
      
        
        <li class="menu-item menu-item-home">
          <a href="/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-home"></i> <br />
            
            首页
          </a>
        </li>
      
        
        <li class="menu-item menu-item-about">
          <a href="/about/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-user"></i> <br />
            
            关于
          </a>
        </li>
      
        
        <li class="menu-item menu-item-tags">
          <a href="/tags/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-tags"></i> <br />
            
            标签
          </a>
        </li>
      
        
        <li class="menu-item menu-item-categories">
          <a href="/categories/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-th"></i> <br />
            
            分类
          </a>
        </li>
      
        
        <li class="menu-item menu-item-archives">
          <a href="/archives/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-archive"></i> <br />
            
            归档
          </a>
        </li>
      

      
    </ul>
  

  
</nav>



 </div>
    </header>

    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  <article class="post post-type-normal" itemscope itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="https://yampery.gitee.io/2019/02/25/distcp/">

    <span hidden itemprop="author" itemscope itemtype="http://schema.org/Person">
      <meta itemprop="name" content="yampery">
      <meta itemprop="description" content="">
      <meta itemprop="image" content="/images/avatar.gif">
    </span>

    <span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="yampery">
    </span>

    
      <header class="post-header">

        
        
          <h1 class="post-title" itemprop="name headline">Hadoop集群间文件拷贝</h1>
        

        <div class="post-meta">
          <span class="post-time">
            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              
              <time title="创建于" itemprop="dateCreated datePublished" datetime="2019-02-25T11:05:38+08:00">
                2019-02-25
              </time>
            

            

            
          </span>

          

          
            
          

          
          
             <span id="/2019/02/25/distcp/" class="leancloud_visitors" data-flag-title="Hadoop集群间文件拷贝">
               <span class="post-meta-divider">|</span>
               <span class="post-meta-item-icon">
                 <i class="fa fa-eye"></i>
               </span>
               
                 <span class="post-meta-item-text">阅读次数&#58;</span>
               
                 <span class="leancloud-visitors-count"></span>
             </span>
          

          

          
            <div class="post-wordcount">
              
                
                <span class="post-meta-item-icon">
                  <i class="fa fa-file-word-o"></i>
                </span>
                
                  <span class="post-meta-item-text">字数统计&#58;</span>
                
                <span title="字数统计">
                  6.6k 字
                </span>
              

              
                <span class="post-meta-divider">|</span>
              

              
                <span class="post-meta-item-icon">
                  <i class="fa fa-clock-o"></i>
                </span>
                
                  <span class="post-meta-item-text">阅读时长 &asymp;</span>
                
                <span title="阅读时长">
                  30 分钟
                </span>
              
            </div>
          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <p>本文介绍distcp的使用、注意事项，最后从源码上简要分析其工作原理和模式。</p>
<h1 id="distcp使用"><a href="#distcp使用" class="headerlink" title="distcp使用"></a>distcp使用</h1><blockquote>
<p>DistCp Version 2(分布式copy)是用于集群间/集群内的文件copy工具，
使用MapReduce实现分布式、错误处理、恢复和报告。distCp会根据目录文件生成map任务，
每一个任务会copy部分文件内容。</p>
</blockquote>
<hr>
<h2 id="基本使用"><a href="#基本使用" class="headerlink" title="基本使用"></a>基本使用</h2><ul>
<li><p>最常使用的是集群间copy</p>
  <figure class="highlight sh"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo</span><br></pre></td></tr></table></figure>
<blockquote>
<p>该命令会将nn1下的<code>/foo/bar</code>放在一个临时文件里，并将内容分割为一系列<code>map tasts</code>，</p>
</blockquote>
<p>  然后在每一个<code>NodeManager</code>上从nn1向nn2拷贝。</p>
<p>  也可在命令行指定多个源文件</p>
  <figure class="highlight sh"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br></pre></td><td class="code"><pre><span class="line">hadoop distcp hdfs://nn1:8020/foo/a </span><br><span class="line">   hdfs://nn1:8020/foo/b </span><br><span class="line">   hdfs://nn2:8020/bar/foo</span><br></pre></td></tr></table></figure>
<p>  同样的，使用-f指定文件，<code>srclist</code>包含a和b</p>
  <figure class="highlight sh"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br></pre></td><td class="code"><pre><span class="line">hadoop distcp -f hdfs://nn1:8020/srclist </span><br><span class="line">  	hdfs://nn2:8020/bar/foo</span><br></pre></td></tr></table></figure>
<blockquote>
<p>在多源<code>copy</code>中，如果源文件冲突，<code>distCp</code>将会终止并发送错误报告，而在目标上的冲突将会根据指定的选项（<code>option</code>）处理。默认情况下，跳过目标集群已经存在的文件而不是替换，跳过文件的数量将会在任务结束的时候报告。不过，如果存在copy任务失败，后续重试成功的情况，任务最终报告的跳过文件的数量将会不准确。</p>
</blockquote>
<blockquote>
<p><strong>注意：每个<code>NodeManager</code>都必须与源文件系统、目标文件系统正常通信。对HDFS来说，源集群和目标集群必须是同一个协议版本（并非组件版本强一致），或使用向下兼容协议</strong></p>
</blockquote>
<blockquote>
<p>Copy完成之后，最好进行产出、交叉验证，保证集群间copy结果无误。</p>
</blockquote>
<blockquote>
<p><strong>如果源集群存在客户端写操作，<code>copy</code>任务很可能会失败，尝试覆盖一个在目标集群上正在写的文件也会失败，如果在copy任务开始之前，源文件被删除，<code>copy</code>任务将会失败并抛出<code>FileNotFoundException</code></strong></p>
</blockquote>
</li>
</ul>
<hr>
<h2 id="Update-和-Overwrite"><a href="#Update-和-Overwrite" class="headerlink" title="Update 和 Overwrite"></a>Update 和 Overwrite</h2><pre><code>`-update` 用来从源集群复制目标集群不存在或者版本不一致的文件. `-overwrite` 用于在目标集群如果存在相同的文件时进行覆盖。
</code></pre><ul>
<li><p>例如：从 <code>/source/first/</code> 何 <code>/source/second/</code> 复制到 <code>/target/</code></p>
<p>  源集群有以下文件</p>
<pre><code>hdfs://nn1:8020/source/first/1
hdfs://nn1:8020/source/first/2
hdfs://nn1:8020/source/second/10
hdfs://nn1:8020/source/second/20
</code></pre><p>   当distCp没有使用<code>-update</code> or <code>-overwrite</code>, distCp 默认将会在<code>/target</code>下创建 <code>first/</code> 和 <code>second/</code>，例如:</p>
<pre><code><figure class="highlight sh"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">distcp hdfs://nn1:8020/<span class="built_in">source</span>/first hdfs://nn1:8020/<span class="built_in">source</span>/second hdfs://nn2:8020/target</span><br></pre></td></tr></table></figure>
</code></pre><p>  在目标文件夹<code>/target</code>下产生以下内容：</p>
<pre><code> hdfs://nn2:8020/target/first/1
 hdfs://nn2:8020/target/first/2
 hdfs://nn2:8020/target/second/10
 hdfs://nn2:8020/target/second/20

当指定 `-update` 或 `-overwrite`, 会将源目录下的内容copy到目标下, 而不是源目录，例如:
 <figure class="highlight sh"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">distcp -update hdfs://nn1:8020/<span class="built_in">source</span>/first hdfs://nn1:8020/<span class="built_in">source</span>/second hdfs://nn2:8020/target</span><br></pre></td></tr></table></figure>
</code></pre><p>  在目标文件夹<code>/target</code>下产生以下内容：</p>
<pre><code>hdfs://nn2:8020/target/1
hdfs://nn2:8020/target/2
hdfs://nn2:8020/target/10
hdfs://nn2:8020/target/20
</code></pre><p>  <strong>如果所有源文件都包含同一个文件(如, <code>0</code>)，那所有的源文件都会在目标文件映射一个 <code>/target/0</code> 的实体，DistCp将会失败。</strong></p>
<p>  考虑如下copy操作</p>
  <figure class="highlight sh"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">distcp hdfs://nn1:8020/<span class="built_in">source</span>/first hdfs://nn1:8020/<span class="built_in">source</span>/second hdfs://nn2:8020/target</span><br></pre></td></tr></table></figure>
<p>  源集群上源文件/大小</p>
<pre><code>hdfs://nn1:8020/source/first/1 32
hdfs://nn1:8020/source/first/2 32
hdfs://nn1:8020/source/second/10 64
hdfs://nn1:8020/source/second/20 32
</code></pre><p>  目标集群上文件/大小</p>
<pre><code>hdfs://nn2:8020/target/1 32
hdfs://nn2:8020/target/10 32
hdfs://nn2:8020/target/20 64
</code></pre><p>  将会导致</p>
<pre><code>hdfs://nn2:8020/target/1 32
hdfs://nn2:8020/target/2 32
hdfs://nn2:8020/target/10 64
hdfs://nn2:8020/target/20 32
</code></pre><ul>
<li><code>1</code> 由于大小和内容一致，将会跳过；</li>
<li><code>2</code> 由于不存在，将会直接copy；</li>
<li><code>10</code> 和 <code>20</code> 由于内容不一致，将会覆盖；</li>
<li>如果使用 <code>-update</code>, <code>1</code> 也会被覆盖。</li>
</ul>
</li>
</ul>
<hr>
<h2 id="DistCp体系结构"><a href="#DistCp体系结构" class="headerlink" title="DistCp体系结构"></a>DistCp体系结构</h2><p>DistCp主要有以下三个组件组成:</p>
<ul>
<li>DistCp Driver</li>
<li>Copy-listing generator</li>
<li>Input-formats 和 Map-Reduce components</li>
</ul>
<h3 id="DistCp-Driver"><a href="#DistCp-Driver" class="headerlink" title="DistCp Driver"></a>DistCp Driver</h3><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br></pre></td><td class="code"><pre><span class="line"><span class="comment">// 构造方法，根据输入的参数创建DistCp</span></span><br><span class="line"><span class="function"><span class="keyword">public</span> <span class="title">DistCp</span><span class="params">(Configuration configuration, DistCpOptions inputOptions)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">    Configuration config = <span class="keyword">new</span> Configuration(configuration);</span><br><span class="line">    config.addResource(DISTCP_DEFAULT_XML);</span><br><span class="line">    setConf(config);</span><br><span class="line">    <span class="keyword">this</span>.inputOptions = inputOptions;</span><br><span class="line">    <span class="keyword">this</span>.metaFolder   = createMetaFolderPath();</span><br><span class="line">&#125;</span><br><span class="line"></span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * 实现 Tool::run(). 组织源文件拷贝到目标位置</span></span><br><span class="line"><span class="comment"> *  1. 创建将要拷贝到目标的文件列表</span></span><br><span class="line"><span class="comment"> *  2. 运行Map任务. (委托给 execute().)</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@param</span> argv List of arguments passed to DistCp, from the ToolRunner.</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@return</span> On success, it returns 0. Else, -1.</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">int</span> <span class="title">run</span><span class="params">(String[] argv)</span> </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (argv.length &lt; <span class="number">1</span>) &#123;</span><br><span class="line">      OptionsParser.usage();</span><br><span class="line">      <span class="keyword">return</span> DistCpConstants.INVALID_ARGUMENT;</span><br><span class="line">    &#125;</span><br><span class="line">    </span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">      inputOptions = (OptionsParser.parse(argv));</span><br><span class="line">      setTargetPathExists();</span><br><span class="line">      LOG.info(<span class="string">"Input Options: "</span> + inputOptions);</span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable e) &#123;</span><br><span class="line">      LOG.error(<span class="string">"Invalid arguments: "</span>, e);</span><br><span class="line">      System.err.println(<span class="string">"Invalid arguments: "</span> + e.getMessage());</span><br><span class="line">      OptionsParser.usage();      </span><br><span class="line">      <span class="keyword">return</span> DistCpConstants.INVALID_ARGUMENT;</span><br><span class="line">    &#125;</span><br><span class="line">	......</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>  DistCp Driver 组件负责以下内容:</p>
<ul>
<li><p>将命令行命令参数解析传给DistCp, 通过:</p>
<ul>
<li>OptionsParser： 通过 <code>public static DistCpOptions parse(String args[]);</code> 方法解析命令行参数，并创建相关的Options对象；</li>
<li>DistCpOptionsSwitch： <code>public enum DistCpOptionSwitch</code>, 一个枚举类，与命令行参数的key（-i, -p）相映射。</li>
</ul>
</li>
<li><p>将命令参数封装在合适的 <code>DistCpOptions</code> 对象中，初始化 <code>DistCp</code>，参数包括：</p>
<ul>
<li>源路径</li>
<li>目标位置</li>
<li>复制选项 (例如：是否采用 <code>update</code>、<code>overwrite</code> 复制，保留文件那些属性等。)</li>
</ul>
</li>
</ul>
<ul>
<li><p>通过以下过程组织copy</p>
<ul>
<li>调用copy-listing-generator 创建要copy的文件列表；</li>
<li>构件并执行 <code>Map-Reduce</code> 来执行copy；</li>
<li>Based on the options, either returning a handle to the Hadoop MR Job
immediately, or waiting till completion.</li>
<li>基于用户指定的选项，立即返回MR任务操作或等待任务完成。</li>
</ul>
</li>
<li><p>Copy-Listing 生成器</p>
<p>The copy-listing-generator类负责创建源集群要复制的文件/文件夹列表。检验源路径中的内容，记录所有需要copy到SequenceFile中的paths，供DistCp消费。其主要包括以下模块：</p>
<ol>
<li>CopyListing: 上层接口，具体的copy-listing-generator通过实现此接口来实现列表生成功能。提供工厂方法供构造CopyListing的实现来选择； </li>
</ol>
<p><img src="https://i.imgur.com/EjuUxrL.png" alt=""></p>
<ol start="2">
<li>SimpleCopyListing: CopyListing的一个实现，接收多源路径，并且递归地列出每个path下的所有单独文件和目录，用于copy；</li>
<li>GlobbedCopyListing: CopyListing的一个实现，支持源路径存在通配符；</li>
<li>FileBasedCopyListing: CopyListing的一个实现，从一个指定文件中读取源路径列表；</li>
</ol>
</li>
</ul>
<pre><code>根据是否在DistCpOptions中指定源文件列表，source-list通过以下方式生成：

1. 如果没有指定source-file-list, 使用GlobbedCopyListing。 扩展所有的通配符，所有扩展发送给SimpleCopyListing，依次构造listing(通过向下递归方式)；
2. 如果指定source-file-list, 使用FileBasedCopyListing。从指定的文件中读取Source-paths, 然后发送给GlobbedCopyListing，像上面一样构造listing。

GlobbedCopyListing构造listing如下：
</code></pre><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br></pre></td><td class="code"><pre><span class="line"><span class="meta">@Override</span></span><br><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">doBuildListing</span><span class="params">(Path pathToListingFile,</span></span></span><br><span class="line"><span class="function"><span class="params">                           DistCpOptions options)</span> <span class="keyword">throws</span> IOException </span>&#123;</span><br><span class="line"></span><br><span class="line">  List&lt;Path&gt; globbedPaths = <span class="keyword">new</span> ArrayList&lt;Path&gt;();</span><br><span class="line">  <span class="keyword">if</span> (options.getSourcePaths().isEmpty()) &#123;</span><br><span class="line">    <span class="keyword">throw</span> <span class="keyword">new</span> InvalidInputException(<span class="string">"Nothing to process. Source paths::EMPTY"</span>);  </span><br><span class="line">  &#125;</span><br><span class="line"></span><br><span class="line">  <span class="keyword">for</span> (Path p : options.getSourcePaths()) &#123;</span><br><span class="line">    FileSystem fs = p.getFileSystem(getConf());</span><br><span class="line">    FileStatus[] inputs = fs.globStatus(p);</span><br><span class="line"></span><br><span class="line">    <span class="keyword">if</span>(inputs != <span class="keyword">null</span> &amp;&amp; inputs.length &gt; <span class="number">0</span>) &#123;</span><br><span class="line">      <span class="keyword">for</span> (FileStatus onePath: inputs) &#123;</span><br><span class="line">        globbedPaths.add(onePath.getPath());</span><br><span class="line">      &#125;</span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">      <span class="keyword">throw</span> <span class="keyword">new</span> InvalidInputException(p + <span class="string">" doesn't exist"</span>);        </span><br><span class="line">    &#125;</span><br><span class="line">  &#125;</span><br><span class="line"></span><br><span class="line">  DistCpOptions optionsGlobbed = <span class="keyword">new</span> DistCpOptions(options);</span><br><span class="line">  optionsGlobbed.setSourcePaths(globbedPaths);</span><br><span class="line">  simpleListing.buildListing(pathToListingFile, optionsGlobbed);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<h3 id="InputFormats-和-MapReduce-组件"><a href="#InputFormats-和-MapReduce-组件" class="headerlink" title="InputFormats 和 MapReduce 组件"></a>InputFormats 和 MapReduce 组件</h3><p>  这两个组件负责实际上从源到目标的文件复制。当copy开始实行时，在copy-listing时生成的listing-file此时将被消费。</p>
<blockquote>
<p>Hadoop中，InputFormat接口定义的方法就是如何读取文件和分割文件以提供分片给mapper。
InputSplit算是Hadoop的一种存储格式，是Hadoop定义的用来传送给每个单独的map的数据，InputSplit存储的并非数据本身，而是一个分片长度和一个记录数据位置的数组。生成InputSplit的方法可以通过InputFormat来设置。</p>
</blockquote>
<ul>
<li><strong>UniformSizeInputFormat:</strong>
实现了 <code>org.apache.hadoop.mapreduce.InputFormat</code>. UniformSizeInputFormat的宗旨是使每个map任务大约有相同的字节数量。
通过配置文件确定map数量和每个map任务的字节数，map数量通过JobContex配置，总copy字节数通过DistCp常量配置。</li>
</ul>
<pre><code><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> List&lt;InputSplit&gt; <span class="title">getSplits</span><span class="params">(Configuration configuration, <span class="keyword">int</span> numSplits, <span class="keyword">long</span> totalSizeBytes)</span> <span class="keyword">throws</span> IOException </span>&#123;</span><br><span class="line">            List&lt;InputSplit&gt; splits = <span class="keyword">new</span> ArrayList&lt;InputSplit&gt;(numSplits);</span><br><span class="line">              <span class="keyword">long</span> nBytesPerSplit = (<span class="keyword">long</span>) Math.ceil(totalSizeBytes * <span class="number">1.0</span> / numSplits);</span><br><span class="line">          </span><br><span class="line">              CopyListingFileStatus srcFileStatus = <span class="keyword">new</span> CopyListingFileStatus();</span><br><span class="line">              Text srcRelPath = <span class="keyword">new</span> Text();</span><br><span class="line">              <span class="keyword">long</span> currentSplitSize = <span class="number">0</span>;</span><br><span class="line">              <span class="keyword">long</span> lastSplitStart = <span class="number">0</span>;</span><br><span class="line">              <span class="keyword">long</span> lastPosition = <span class="number">0</span>;</span><br><span class="line">          </span><br><span class="line">              <span class="keyword">final</span> Path listingFilePath = getListingFilePath(configuration);</span><br><span class="line">          </span><br><span class="line">              <span class="keyword">if</span> (LOG.isDebugEnabled()) &#123;</span><br><span class="line">                LOG.debug(<span class="string">"Average bytes per map: "</span> + nBytesPerSplit +</span><br><span class="line">                    <span class="string">", Number of maps: "</span> + numSplits + <span class="string">", total size: "</span> + totalSizeBytes);</span><br><span class="line">              &#125;</span><br><span class="line">              SequenceFile.Reader reader=<span class="keyword">null</span>;</span><br><span class="line">              <span class="keyword">try</span> &#123;</span><br><span class="line">                reader = getListingFileReader(configuration);</span><br><span class="line">                <span class="keyword">while</span> (reader.next(srcRelPath, srcFileStatus)) &#123;</span><br><span class="line">                  <span class="comment">// 如果添加该文件导致每个map超出了最大字节数限制，将当前文件添加到新的split</span></span><br><span class="line">                  <span class="keyword">if</span> (currentSplitSize + srcFileStatus.getLen() &gt; nBytesPerSplit &amp;&amp; lastPosition != <span class="number">0</span>) &#123;</span><br><span class="line">                    FileSplit split = <span class="keyword">new</span> FileSplit(listingFilePath, lastSplitStart,</span><br><span class="line">                        lastPosition - lastSplitStart, <span class="keyword">null</span>);</span><br><span class="line">                    <span class="keyword">if</span> (LOG.isDebugEnabled()) &#123;</span><br><span class="line">                      LOG.debug (<span class="string">"Creating split : "</span> + split + <span class="string">", bytes in split: "</span> + currentSplitSize);</span><br><span class="line">                    &#125;</span><br><span class="line">                    splits.add(split);</span><br><span class="line">                    lastSplitStart = lastPosition;</span><br><span class="line">                    currentSplitSize = <span class="number">0</span>;</span><br><span class="line">                  &#125;</span><br><span class="line">                  currentSplitSize += srcFileStatus.getLen();</span><br><span class="line">                  lastPosition = reader.getPosition();</span><br><span class="line">                &#125;</span><br><span class="line">                <span class="keyword">if</span> (lastPosition &gt; lastSplitStart) &#123;</span><br><span class="line">                  FileSplit split = <span class="keyword">new</span> FileSplit(listingFilePath, lastSplitStart,</span><br><span class="line">                      lastPosition - lastSplitStart, <span class="keyword">null</span>);</span><br><span class="line">                  <span class="keyword">if</span> (LOG.isDebugEnabled()) &#123;</span><br><span class="line">                    LOG.info (<span class="string">"Creating split : "</span> + split + <span class="string">", bytes in split: "</span> + currentSplitSize);</span><br><span class="line">                  &#125;</span><br><span class="line">                  splits.add(split);</span><br><span class="line">                &#125;</span><br><span class="line">          </span><br><span class="line">              &#125; <span class="keyword">finally</span> &#123;</span><br><span class="line">                IOUtils.closeStream(reader);</span><br><span class="line">              &#125;</span><br><span class="line">          </span><br><span class="line">              <span class="keyword">return</span> splits;</span><br><span class="line">            &#125;</span><br></pre></td></tr></table></figure>
</code></pre><ul>
<li><p><strong>DynamicInputFormat 和 DynamicRecordReader:</strong></p>
<p>DynamicInputFormat 实现了 <code>org.apache.hadoop.mapreduce.InputFormat</code>，
在DFS上将copy-list分成一个个chunk，创建一系列splits，每个split根据自己的能力消费chunks，这种模式可以避免最慢的mapper拖累整个任务。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br></pre></td><td class="code"><pre><span class="line"> <span class="function"><span class="keyword">private</span> List&lt;InputSplit&gt; <span class="title">createSplits</span><span class="params">(JobContext jobContext,</span></span></span><br><span class="line"><span class="function"><span class="params">                                      List&lt;DynamicInputChunk&gt; chunks)</span></span></span><br><span class="line"><span class="function">	<span class="keyword">throws</span> IOException </span>&#123;</span><br><span class="line">	<span class="keyword">int</span> numMaps = getNumMapTasks(jobContext.getConfiguration());</span><br><span class="line">	</span><br><span class="line">	<span class="keyword">final</span> <span class="keyword">int</span> nSplits = Math.min(numMaps, chunks.size());</span><br><span class="line">	List&lt;InputSplit&gt; splits = <span class="keyword">new</span> ArrayList&lt;InputSplit&gt;(nSplits);</span><br><span class="line">	</span><br><span class="line">	<span class="keyword">for</span> (<span class="keyword">int</span> i=<span class="number">0</span>; i&lt; nSplits; ++i) &#123;</span><br><span class="line">	  TaskID taskId = <span class="keyword">new</span> TaskID(jobContext.getJobID(), TaskType.MAP, i);</span><br><span class="line">	  chunks.get(i).assignTo(taskId);</span><br><span class="line">	  splits.add(<span class="keyword">new</span> FileSplit(chunks.get(i).getPath(), <span class="number">0</span>,</span><br><span class="line">	      <span class="comment">// Setting non-zero length for FileSplit size, to avoid a possible</span></span><br><span class="line">	      <span class="comment">// future when 0-sized file-splits are considered "empty" and skipped</span></span><br><span class="line">	      <span class="comment">// over.</span></span><br><span class="line">	      getMinRecordsPerChunk(jobContext.getConfiguration()),</span><br><span class="line">	      <span class="keyword">null</span>));</span><br><span class="line">	&#125;</span><br><span class="line">	DistCpUtils.publish(jobContext.getConfiguration(),</span><br><span class="line">	                    CONF_LABEL_NUM_SPLITS, splits.size());</span><br><span class="line">	<span class="keyword">return</span> splits;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>DynamicRecordReader 配合DynamicInputFormat实现一种’Worker-pattern’复制，保证能者多劳，主要干两件事：</p>
<ol>
<li>将每个chunk的内容递交给DistCp的mapper；</li>
<li>当当前chunk消费完后，立即获取下一个chunk。  </li>
</ol>
</li>
</ul>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">boolean</span> <span class="title">nextKeyValue</span><span class="params">()</span></span></span><br><span class="line"><span class="function">      <span class="keyword">throws</span> IOException, InterruptedException </span>&#123;</span><br><span class="line"></span><br><span class="line">	    <span class="keyword">if</span> (chunk == <span class="keyword">null</span>) &#123;</span><br><span class="line">	      <span class="keyword">if</span> (LOG.isDebugEnabled())</span><br><span class="line">	        LOG.debug(taskId + <span class="string">": RecordReader is null. No records to be read."</span>);</span><br><span class="line">	      <span class="keyword">return</span> <span class="keyword">false</span>;</span><br><span class="line">	    &#125;</span><br><span class="line">	</span><br><span class="line">	    <span class="keyword">if</span> (chunk.getReader().nextKeyValue()) &#123;</span><br><span class="line">	      ++numRecordsProcessedByThisMap;</span><br><span class="line">	      <span class="keyword">return</span> <span class="keyword">true</span>;</span><br><span class="line">	    &#125;</span><br><span class="line">	</span><br><span class="line">	    <span class="keyword">if</span> (LOG.isDebugEnabled())</span><br><span class="line">	      LOG.debug(taskId + <span class="string">": Current chunk exhausted. "</span> +</span><br><span class="line">	                         <span class="string">" Attempting to pick up new one."</span>);</span><br><span class="line">	</span><br><span class="line">	    chunk.release();</span><br><span class="line">	    timeOfLastChunkDirScan = System.currentTimeMillis();</span><br><span class="line">	    isChunkDirAlreadyScanned = <span class="keyword">false</span>;</span><br><span class="line">	    </span><br><span class="line">	    chunk = chunkContext.acquire(taskAttemptContext);</span><br><span class="line">	</span><br><span class="line">	    <span class="keyword">if</span> (chunk == <span class="keyword">null</span>) <span class="keyword">return</span> <span class="keyword">false</span>;</span><br><span class="line">	</span><br><span class="line">	    <span class="keyword">if</span> (chunk.getReader().nextKeyValue()) &#123;</span><br><span class="line">	      ++numRecordsProcessedByThisMap;</span><br><span class="line">	      <span class="keyword">return</span> <span class="keyword">true</span>;</span><br><span class="line">	    &#125;</span><br><span class="line">	    <span class="keyword">else</span> &#123;</span><br><span class="line">	      <span class="keyword">return</span> <span class="keyword">false</span>;</span><br><span class="line">	    &#125;</span><br><span class="line">  	&#125;</span><br></pre></td></tr></table></figure>
<ul>
<li><p><strong>CopyMapper:</strong>
实现 <code>Mapper</code>, 该类实现物理上的文件拷贝. 输入路径，参照指定参数决定是否拷贝文件. 只有出现以下情形之一，文件才会被拷贝:</p>
<ul>
<li>目标不存在同名文件；</li>
<li>目标上存在同名文件，但文件大小不同；</li>
<li>目标上存在同名文件，但是文件校验和不同，并且没有指定 <code>-skipcrccheck</code>；</li>
<li>目标上存在同名文件，但是指定了 <code>-overwrite</code>；</li>
<li>标上存在同名的文件, 但在块大小上有所不同（块大小需要被保留）。</li>
</ul>
</li>
<li><p><strong>CopyCommitter:</strong> 继承了 <code>FileOutputCommitter</code>, 负责DistCp的任务提交阶段:</p>
<ul>
<li>维护目录访问权限（如果指定了相应参数）；</li>
<li>清理元文件夹、临时文件（meta-folder, DistCp维护的文件表）；</li>
<li>将数据从临时工作文件夹原子移动（atom-move）到最终路径（如果指定atomic-commit）；</li>
<li>从目标上删除源上丢失的的文件（如果指定操作）；</li>
<li>清理所有不完全拷贝文件。</li>
</ul>
</li>
</ul>
<h2 id="Map-sizing"><a href="#Map-sizing" class="headerlink" title="Map sizing"></a>Map sizing</h2><p>  默认地，DistCp尝试以可比较地调整每个map的大小，让其拷贝大约相同字节数量。注意，文件是最好的粒度级别，因此，增加并行的copier（map）的数量并不总能相应的提高并行拷贝的数量和总的吞吐量。</p>
<p>  新的DistCp同时提供一种策略来”动态”调整maps大小的，允许较快的data-nodes扶正更多的字节。使用 <code>-strategy dynamic</code>, 将文件（files）分割成多个sets，而不是为每个map任务分配固定的源文件集合。sets的数量限制maps的数量，通常使用因子2-3，每个map获取并拷贝一个chunk中所有列出的文件。当一个使用完一个chunk，会获取并处理一个新的chunk，直到没有更多的chunks。</p>
<p>  最终，处理速度较快的map任务将会处理更多的chunks，就mapper的处理能力来看，是公平的。</p>
<p>  这种动态策略通过DynamicInputFormat实现。大多数情况下能够获得较好的性能。</p>
<p>  对于长时间运行、周期性的任务，建议将maps的数量调整为源群集和目标群集的大小、副本的大小以及可用带宽。（Tuning the number of maps to the size of the source and destination clusters,
  the size of the copy, and the available bandwidth is recommended for
  long-running and regularly run jobs.）</p>
<hr>
<h2 id="不同版本间拷贝"><a href="#不同版本间拷贝" class="headerlink" title="不同版本间拷贝"></a>不同版本间拷贝</h2><p>  对于两个不同主版本Hadoop之间的拷贝来说(例如：1.X和2.X)，其中一个通常使用 <code>WebHdfsFileSystem</code>，不同于 <code>HftpFileSystem</code>，webhdfs适用于读和写操作，hftp是只读操作。DistCp可以运行在源集群或目标集群上。远程集群指定 <code>webhdfs://&lt;namenode_hostname&gt;:&lt;http_port&gt;</code>。当主版本相同时，使用hdfs协议性能会更好。</p>
<hr>
<h2 id="MapReduce-和-其他副作用"><a href="#MapReduce-和-其他副作用" class="headerlink" title="MapReduce 和 其他副作用"></a>MapReduce 和 其他副作用</h2><p>  之前提到过，万一一个map拷贝inputs（属于该map的）中的一个失败了，将会带来一些副作用：</p>
<ul>
<li>除非指定了 <code>-overwrite</code>，否则之前的map成功拷贝的文件将被标记为”skipped”；</li>
<li>如果一个map在尝试最大次数(<code>mapreduce.map.maxattempts</code>)后失败，剩下的map任务将被杀死(除非指定 <code>-i</code>)；</li>
<li>If <code>mapreduce.map.speculative</code> is set set final and true, the result of the
copy is undefined.</li>
<li>如果设置 <code>mapreduce.map.speculative</code>(<em>hadoop的推测执行，通常集群中的机器配置差异较大才会打开</em>) 为 “true”，拷贝的结果是未定义。</li>
</ul>
<hr>
<h1 id="从源码理解DistCp"><a href="#从源码理解DistCp" class="headerlink" title="从源码理解DistCp"></a>从源码理解DistCp</h1><h2 id="DistCp"><a href="#DistCp" class="headerlink" title="DistCp"></a>DistCp</h2><p>DistCp实现Tool，这样可以使用ToolRunner调度。ToolRunner调度器执行时，调用DistCp的run()方法，一切从这里开始。</p>
<p>阅读DistCp的构造器，首先在DistCp初始化时处理了配置、命令参数等。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="title">DistCp</span><span class="params">(Configuration configuration, DistCpOptions inputOptions)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">	Configuration config = <span class="keyword">new</span> Configuration(configuration);</span><br><span class="line">	config.addResource(DISTCP_DEFAULT_XML);</span><br><span class="line">	setConf(config);</span><br><span class="line">	<span class="keyword">this</span>.inputOptions = inputOptions;</span><br><span class="line">	<span class="keyword">this</span>.metaFolder   = createMetaFolderPath();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>run()是一个调度方法，调起execute()，校验输入、处理异常等，</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br></pre></td><td class="code"><pre><span class="line"><span class="comment">// 调度方法，省掉了部分catch</span></span><br><span class="line"><span class="meta">@Override</span></span><br><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">int</span> <span class="title">run</span><span class="params">(String[] argv)</span> </span>&#123;</span><br><span class="line">	<span class="keyword">if</span> (argv.length &lt; <span class="number">1</span>) &#123;</span><br><span class="line">	  OptionsParser.usage();</span><br><span class="line">	  <span class="keyword">return</span> DistCpConstants.INVALID_ARGUMENT;</span><br><span class="line">	&#125;</span><br><span class="line">	</span><br><span class="line">	<span class="keyword">try</span> &#123;</span><br><span class="line">	  inputOptions = (OptionsParser.parse(argv));</span><br><span class="line">	  setTargetPathExists();</span><br><span class="line">	  LOG.info(<span class="string">"Input Options: "</span> + inputOptions);</span><br><span class="line">	&#125; <span class="keyword">catch</span> (Throwable e) &#123;</span><br><span class="line">	  </span><br><span class="line">	&#125;</span><br><span class="line">	</span><br><span class="line">	<span class="keyword">try</span> &#123;</span><br><span class="line">	  execute(); <span class="comment">// 调用执行</span></span><br><span class="line">	&#125; <span class="keyword">catch</span> (Exception e) &#123;</span><br><span class="line">	  LOG.error(<span class="string">"Exception encountered "</span>, e);</span><br><span class="line">	  <span class="keyword">return</span> DistCpConstants.UNKNOWN_ERROR;</span><br><span class="line">	&#125;</span><br><span class="line">	<span class="keyword">return</span> DistCpConstants.SUCCESS;</span><br><span class="line">&#125;</span><br><span class="line"></span><br><span class="line"><span class="function"><span class="keyword">public</span> Job <span class="title">execute</span><span class="params">()</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">	Job job = createAndSubmitJob();</span><br><span class="line">	</span><br><span class="line">	<span class="keyword">if</span> (inputOptions.shouldBlock()) &#123;</span><br><span class="line">	  waitForJobCompletion(job);</span><br><span class="line">	&#125;</span><br><span class="line">	<span class="keyword">return</span> job;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>excute()方法是执行任务，调用createAndSubmitJob()，该方法创建并提交任务到hadoop集群运行。
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br></pre></td><td class="code"><pre><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * 创建并提交MR任务</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@return</span> 已提交的MR任务实例</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="function"><span class="keyword">public</span> Job <span class="title">createAndSubmitJob</span><span class="params">()</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">    <span class="keyword">assert</span> inputOptions != <span class="keyword">null</span>;</span><br><span class="line">    <span class="function"><span class="keyword">assert</span> <span class="title">getConf</span><span class="params">()</span> !</span>= <span class="keyword">null</span>;</span><br><span class="line">    Job job = <span class="keyword">null</span>;</span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">      <span class="keyword">synchronized</span>(<span class="keyword">this</span>) &#123;</span><br><span class="line">        <span class="comment">//Don't cleanup while we are setting up.</span></span><br><span class="line">        metaFolder = createMetaFolderPath();</span><br><span class="line">        jobFS = metaFolder.getFileSystem(getConf());</span><br><span class="line">        job = createJob();</span><br><span class="line">      &#125;</span><br><span class="line">      <span class="keyword">if</span> (inputOptions.shouldUseDiff()) &#123;</span><br><span class="line">        <span class="keyword">if</span> (!DistCpSync.sync(inputOptions, getConf())) &#123;</span><br><span class="line">          inputOptions.disableUsingDiff();</span><br><span class="line">        &#125;</span><br><span class="line">      &#125;</span><br><span class="line">      createInputFileListing(job);</span><br><span class="line"></span><br><span class="line">      job.submit();</span><br><span class="line">      submitted = <span class="keyword">true</span>;</span><br><span class="line">    &#125; <span class="keyword">finally</span> &#123;</span><br><span class="line">      <span class="keyword">if</span> (!submitted) &#123;</span><br><span class="line">        cleanup();</span><br><span class="line">      &#125;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    String jobID = job.getJobID().toString();</span><br><span class="line">    job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);</span><br><span class="line">    LOG.info(<span class="string">"DistCp job-id: "</span> + jobID);</span><br><span class="line"></span><br><span class="line">    <span class="keyword">return</span> job;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure></p>
<p>下面分析源码中的几个关键字。</p>
<h3 id="metaFolder"><a href="#metaFolder" class="headerlink" title="metaFolder"></a>metaFolder</h3><p>在DistCp中定义为 <code>private Path metaFolder</code>，是一个 <code>org.apache.hadoop.fs.Path</code> 类型，顾名思义，是准备元数据的地方。</p>
<ul>
<li><p>createMetaFolderPath</p>
<p>  结合一个随机数生成一个临时目录。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> Path <span class="title">createMetaFolderPath</span><span class="params">()</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">	Configuration configuration = getConf();</span><br><span class="line">	Path stagingDir = JobSubmissionFiles.getStagingDir(</span><br><span class="line">	        <span class="keyword">new</span> Cluster(configuration), configuration);</span><br><span class="line">	Path metaFolderPath = <span class="keyword">new</span> Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));</span><br><span class="line">	<span class="keyword">if</span> (LOG.isDebugEnabled())</span><br><span class="line">	  LOG.debug(<span class="string">"Meta folder location: "</span> + metaFolderPath);</span><br><span class="line">	configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    </span><br><span class="line">	<span class="keyword">return</span> metaFolderPath;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
</li>
<li><p>createInputFileListing(Job job)</p>
<p>  注意到在metaFolder下，通过getFileListingPath()生成fileList.seq文件，稍后会往fileList.seq中写入数据，这是一个SequenceFile文件，SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件，这个文件里将存放所有需要拷贝的源目录/文件信息列表。我们下面将介绍fileList.seq文件。</p>
</li>
</ul>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">protected</span> Path <span class="title">createInputFileListing</span><span class="params">(Job job)</span> <span class="keyword">throws</span> IOException </span>&#123;</span><br><span class="line">  Path fileListingPath = getFileListingPath();</span><br><span class="line">  CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),</span><br><span class="line">      job.getCredentials(), inputOptions);</span><br><span class="line">  copyListing.buildListing(fileListingPath, inputOptions);</span><br><span class="line">  <span class="keyword">return</span> fileListingPath;</span><br><span class="line">&#125;</span><br><span class="line">	</span><br><span class="line"><span class="function"><span class="keyword">protected</span> Path <span class="title">getFileListingPath</span><span class="params">()</span> <span class="keyword">throws</span> IOException </span>&#123;</span><br><span class="line">  String fileListPathStr = metaFolder + <span class="string">"/fileList.seq"</span>;</span><br><span class="line">  Path path = <span class="keyword">new</span> Path(fileListPathStr);</span><br><span class="line">  <span class="keyword">return</span> <span class="keyword">new</span> Path(path.toUri().normalize().toString());</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<h3 id="fileList-seq文件"><a href="#fileList-seq文件" class="headerlink" title="fileList.seq文件"></a>fileList.seq文件</h3><p>这个文件里将存放所有需要拷贝的源目录/文件信息列表，注意到上面 <code>createInputFileListing</code> 中的 <code>copyListing.buildListing(fileListingPath, inputOptions)</code> 方法，如前所述，CopyListing有三种实现，最终都调用了其中SimpleCopyList类的buildList方法，该方法就是用来收集要拷贝的文件列表并写入fileList.seq文件，如下简略代码中，第一个参数就是Hadoop的Writer，通过 <code>getWriter(Path)</code> 获取。写入SequenceFile中的Key是源文件的Text格式的相对路径，即relPath；而Value则记录源文件的FileStatus格式的org.apache.hadoop.fs.FileStatus信息，这里FileStatus是hadoop已经封装好了的描述HDFS文件信息的类，但是DISTCP为了更好的处理数据，重新继承并封装了CopyListingFileStatus类。如下图，主要增加了aclEntries和xAttrs成员变量，关于文件权限和属性的，并覆盖了图中的方法。</p>
<p><img src="https://i.imgur.com/38o3TKy.png" alt="">
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">doBuildListing</span><span class="params">(SequenceFile.Writer fileListWriter,</span></span></span><br><span class="line"><span class="function"><span class="params">    DistCpOptions options)</span> <span class="keyword">throws</span> IOException </span>&#123;</span><br><span class="line">		<span class="comment">// 省略...</span></span><br><span class="line">&#125;</span><br><span class="line"><span class="keyword">private</span> SequenceFile.<span class="function">Writer <span class="title">getWriter</span><span class="params">(Path pathToListFile)</span> <span class="keyword">throws</span> IOException </span>&#123;</span><br><span class="line">  FileSystem fs = pathToListFile.getFileSystem(getConf());</span><br><span class="line">  <span class="keyword">if</span> (fs.exists(pathToListFile)) &#123;</span><br><span class="line">    fs.delete(pathToListFile, <span class="keyword">false</span>);</span><br><span class="line">  &#125;</span><br><span class="line">  <span class="keyword">return</span> SequenceFile.createWriter(getConf(),</span><br><span class="line">          SequenceFile.Writer.file(pathToListFile),</span><br><span class="line">          SequenceFile.Writer.keyClass(Text.class),</span><br><span class="line">          SequenceFile.Writer.valueClass(CopyListingFileStatus.class),</span><br><span class="line">          SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure></p>
<h3 id="Job"><a href="#Job" class="headerlink" title="Job"></a>Job</h3><p>如下为创建任务的源码，和平时创建任务一样，创建Job对象，设置名称，配置输入输出路径，设置Mapper类和Reducer，注意这里只有map任务，设置输入输出类型等等，在这里最终将创建好的job返回给excute()，一层层向上，然后送给调度器。下面关注与文件拷贝相关的几点：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> Job <span class="title">createJob</span><span class="params">()</span> <span class="keyword">throws</span> IOException </span>&#123;</span><br><span class="line">  String jobName = <span class="string">"distcp"</span>;</span><br><span class="line">  String userChosenName = getConf().get(JobContext.JOB_NAME);</span><br><span class="line">  <span class="keyword">if</span> (userChosenName != <span class="keyword">null</span>)</span><br><span class="line">    jobName += <span class="string">": "</span> + userChosenName;</span><br><span class="line">  Job job = Job.getInstance(getConf());</span><br><span class="line">  job.setJobName(jobName);</span><br><span class="line">  job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));</span><br><span class="line">  job.setJarByClass(CopyMapper.class);</span><br><span class="line">  configureOutputFormat(job);</span><br><span class="line"></span><br><span class="line">  job.setMapperClass(CopyMapper.class);</span><br><span class="line">  job.setNumReduceTasks(<span class="number">0</span>);</span><br><span class="line">  job.setMapOutputKeyClass(Text.class);</span><br><span class="line">  job.setMapOutputValueClass(Text.class);</span><br><span class="line">  job.setOutputFormatClass(CopyOutputFormat.class);</span><br><span class="line">  job.getConfiguration().set(JobContext.MAP_SPECULATIVE, <span class="string">"false"</span>);</span><br><span class="line">  job.getConfiguration().set(JobContext.NUM_MAPS,</span><br><span class="line">                String.valueOf(inputOptions.getMaxMaps()));</span><br><span class="line"></span><br><span class="line">  <span class="keyword">if</span> (inputOptions.getSslConfigurationFile() != <span class="keyword">null</span>) &#123;</span><br><span class="line">    setupSSLConfig(job);</span><br><span class="line">  &#125;</span><br><span class="line"></span><br><span class="line">  inputOptions.appendToConf(job.getConfiguration());</span><br><span class="line">  <span class="keyword">return</span> job;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<h2 id="InputFormat"><a href="#InputFormat" class="headerlink" title="InputFormat"></a>InputFormat</h2><p>在MapReduce中，InputFormat用来描述输入数据的格式，提供以下两个功能：</p>
<ul>
<li>数据切分，按照某个策略将输入数据且分成若干个split，以便确定MapTask的个数即Mapper的个数，在MapReduce框架中，一个split就意味着需要一个Map Task;</li>
<li>为Mapper提供输入数据，即给定一个split，(使用其中的RecordReader对象)将之解析为一个个的key/value键值对。 </li>
</ul>
<p>如下，getSplits用于切分数据，是一种逻辑分片的概念，数据还是按block存储，查看InputSplit源码，InputSplit只记录了Mapper要处理的数据的元数据信息，如起始位置、长度和所在的节点；
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">public</span> <span class="keyword">abstract</span> <span class="class"><span class="keyword">class</span> <span class="title">InputFormat</span>&lt;<span class="title">K</span>, <span class="title">V</span>&gt; </span>&#123;</span><br><span class="line">  <span class="keyword">public</span> <span class="keyword">abstract</span> </span><br><span class="line">    <span class="function">List&lt;InputSplit&gt; <span class="title">getSplits</span><span class="params">(JobContext context</span></span></span><br><span class="line"><span class="function"><span class="params">                               )</span> <span class="keyword">throws</span> IOException, InterruptedException</span>;</span><br><span class="line">  <span class="keyword">public</span> <span class="keyword">abstract</span> </span><br><span class="line">    <span class="function">RecordReader&lt;K,V&gt; <span class="title">createRecordReader</span><span class="params">(InputSplit split,</span></span></span><br><span class="line"><span class="function"><span class="params">                                         TaskAttemptContext context</span></span></span><br><span class="line"><span class="function"><span class="params">                                        )</span> <span class="keyword">throws</span> IOException, </span></span><br><span class="line"><span class="function">                                                 InterruptedException</span>;</span><br><span class="line">&#125;</span><br><span class="line"></span><br><span class="line"><span class="keyword">public</span> <span class="keyword">abstract</span> <span class="class"><span class="keyword">class</span> <span class="title">InputSplit</span> </span>&#123;</span><br><span class="line"> </span><br><span class="line">  <span class="function"><span class="keyword">public</span> <span class="keyword">abstract</span> <span class="keyword">long</span> <span class="title">getLength</span><span class="params">()</span> <span class="keyword">throws</span> IOException, InterruptedException</span>;</span><br><span class="line">  <span class="keyword">public</span> <span class="keyword">abstract</span> </span><br><span class="line">    String[] getLocations() <span class="keyword">throws</span> IOException, InterruptedException;</span><br><span class="line">  </span><br><span class="line">  <span class="meta">@Evolving</span></span><br><span class="line">  <span class="keyword">public</span> SplitLocationInfo[] getLocationInfo() <span class="keyword">throws</span> IOException &#123;</span><br><span class="line">    <span class="keyword">return</span> <span class="keyword">null</span>;</span><br><span class="line">  &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure></p>
<p>在DistCp中，有两种输入格式，在上一节创建job代码中可以看到 <code>job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));</code> 是根据配置设置输入格式的。
查看 <code>DistCpConstants</code> 类可以发现 <code>public static final String UNIFORMSIZE = &quot;uniformsize&quot;;</code>, 默认使用的是uniformsize，我们可以通过参数指定使用dynamic模式，两者区别参考<a href="#InputFormats-和-MapReduce-组件">1.3.2小节。</a></p>
<h2 id="CopyMapper"><a href="#CopyMapper" class="headerlink" title="CopyMapper"></a>CopyMapper</h2><p>在创建任务中，通过 <code>job.setMapperClass(CopyMapper.class);</code> 设置了mapper类。<code>CopyMapper</code> 是真正的mapper操作，hadoop中的mapper任务就是按照这个逻辑走的。在DistCp中没有reduce，仅有mapper任务，CopyMapper与其他的mapper没有什么区别，也是实现了基类 <code>Mapper</code> 中的 <code>setup()</code> 和 <code>map()</code> 两个核心方法。</p>
<ul>
<li>setup: 初始化、参数读取、文件校验等前期工作。</li>
</ul>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br></pre></td><td class="code"><pre><span class="line"><span class="meta">@Override</span></span><br><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">setup</span><span class="params">(Context context)</span> <span class="keyword">throws</span> IOException, InterruptedException </span>&#123;</span><br><span class="line">    conf = context.getConfiguration();</span><br><span class="line"></span><br><span class="line">    syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), <span class="keyword">false</span>);</span><br><span class="line">    ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), <span class="keyword">false</span>);</span><br><span class="line">    skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), <span class="keyword">false</span>);</span><br><span class="line">    overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), <span class="keyword">false</span>);</span><br><span class="line">    append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), <span class="keyword">false</span>);</span><br><span class="line">    preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.</span><br><span class="line">        PRESERVE_STATUS.getConfigLabel()));</span><br><span class="line"></span><br><span class="line">    targetWorkPath = <span class="keyword">new</span> Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));</span><br><span class="line">    Path targetFinalPath = <span class="keyword">new</span> Path(conf.get(</span><br><span class="line">            DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));</span><br><span class="line">    targetFS = targetFinalPath.getFileSystem(conf);</span><br><span class="line"></span><br><span class="line">    <span class="keyword">if</span> (targetFS.exists(targetFinalPath) &amp;&amp; targetFS.isFile(targetFinalPath)) &#123;</span><br><span class="line">      overWrite = <span class="keyword">true</span>; <span class="comment">// When target is an existing file, overwrite it.</span></span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    <span class="keyword">if</span> (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != <span class="keyword">null</span>) &#123;</span><br><span class="line">      initializeSSLConf(context);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<ul>
<li>map: 从方法参数可以看出，是对InputFormat分片数据每一行进行处理，其中省略了大部分代码，基本都是针对命令行参数对文件做一些前期校验，是否跳过等。最终如果需要拷贝，会通过 <code>RetriableFileCopyCommand</code> 类使用真正拷贝字节的方法 <code>copyBytes</code>, 就是普通的文件流操作了。</li>
</ul>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">map</span><span class="params">(Text relPath, CopyListingFileStatus sourceFileStatus,</span></span></span><br><span class="line"><span class="function"><span class="params">      Context context)</span> <span class="keyword">throws</span> IOException, InterruptedException </span>&#123;</span><br><span class="line">    copyFileWithRetry(description, sourceCurrStatus, target, context, action, fileAttributes);</span><br><span class="line">&#125;</span><br><span class="line"></span><br><span class="line"><span class="function"><span class="keyword">long</span> <span class="title">copyBytes</span><span class="params">(FileStatus sourceFileStatus, <span class="keyword">long</span> sourceOffset,</span></span></span><br><span class="line"><span class="function"><span class="params">      OutputStream outStream, <span class="keyword">int</span> bufferSize, Mapper.Context context)</span></span></span><br><span class="line"><span class="function">      <span class="keyword">throws</span> IOException </span>&#123;</span><br><span class="line">    Path source = sourceFileStatus.getPath();</span><br><span class="line">    <span class="keyword">byte</span> buf[] = <span class="keyword">new</span> <span class="keyword">byte</span>[bufferSize];</span><br><span class="line">    ThrottledInputStream inStream = <span class="keyword">null</span>;</span><br><span class="line">    <span class="keyword">long</span> totalBytesRead = <span class="number">0</span>;</span><br><span class="line"></span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">      inStream = getInputStream(source, context.getConfiguration());</span><br><span class="line">      <span class="keyword">int</span> bytesRead = readBytes(inStream, buf, sourceOffset);</span><br><span class="line">      <span class="keyword">while</span> (bytesRead &gt;= <span class="number">0</span>) &#123;</span><br><span class="line">        totalBytesRead += bytesRead;</span><br><span class="line">        <span class="keyword">if</span> (action == FileAction.APPEND) &#123;</span><br><span class="line">          sourceOffset += bytesRead;</span><br><span class="line">        &#125;</span><br><span class="line">        outStream.write(buf, <span class="number">0</span>, bytesRead);</span><br><span class="line">        updateContextStatus(totalBytesRead, context, sourceFileStatus);</span><br><span class="line">        bytesRead = readBytes(inStream, buf, sourceOffset);</span><br><span class="line">      &#125;</span><br><span class="line">      outStream.close();</span><br><span class="line">      outStream = <span class="keyword">null</span>;</span><br><span class="line">    &#125; <span class="keyword">finally</span> &#123;</span><br><span class="line">      IOUtils.cleanup(LOG, outStream, inStream);</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> totalBytesRead;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<hr>
<h1 id="命令参数"><a href="#命令参数" class="headerlink" title="命令参数"></a>命令参数</h1><table border="0" style="display: table;border-collapse: separate;border-spacing: 2px;border-color:#ccc;">


<tr class="a">

<td>Flag </td>

<td>Description </td>

<td>Notes</td>
    </tr>


<tr class="b">

<td><tt>-p[rbugpcaxt]</tt> </td>

<td>Preserve r: replication number<br> b: block size<br> u: user<br> g: group<br> p: permission<br> c: checksum-type<br> a: ACL<br> x: XAttr<br> t: timestamp </td>

<td>当指定 <tt>-update</tt>，更新的状态<b>不会同步</b>，除非文件大小不同（例如：重新创建文件）。 如果指定 <tt>-pa</tt>， DistCp 保留文件权限。</td>
    </tr>

<tr class="a">

<td><tt>-i</tt> </td>

<td>忽略失败 </td>

<td>相较默认情况，该选项会提供更精确的拷贝统计，并且保存拷贝失败的日志，方便调试。在没有文成任务中所有分块的尝试之前，一个map的失败不会导致整个任务失败。</td>
    </tr>

<tr class="b">

<td><tt>-log &lt;logdir&gt;</tt> </td>

<td>将日志记录到 &lt;logdir&gt; </td>

<td>DistCp为每个尝试拷贝的操作记录日志并作为map任务的输出。如果一个map失败了，重新执行这个map，日志将不会保留。</td>
    </tr>

<tr class="a">

<td><tt>-v</tt> </td>

<td>SKIP/COPY日志中的附加信息 (path, size) </td>

<td>该选项只能与 <tt>-log</tt> 一起使用.</td>
    </tr>

<tr class="b">

<td><tt>-m &lt;num_maps&gt;</tt> </td>

<td>并行拷贝的最大数量（多少个copy）</td>

<td>指定拷贝数据的map数量。 注意增加map数量并不能够有效提高吞吐量。</td>
    </tr>

<tr class="a">

<td><tt>-overwrite</tt> </td>

<td>覆盖目标文件 </td>

<td>如果map失败了并且没有指定 <tt>-i</tt>, 不仅仅那些失败的文件，这个split中的所有文件将被重新拷贝。 正如使用手册中的说明，这同时会改变生成目标路径的语义(it also changes the semantics for generating destination paths)，用户应该谨慎使用。</td>
    </tr>

<tr class="b">

<td><tt>-update</tt> </td>

<td>如果目标的size(大小), blocksize(块大小), 或 checksum(校验和) 不同，则进行覆盖</td>

<td>这并非“同步”操作，是否执行覆盖的标准是源和目标的文件大小、块大小和校验和是否相同; 如果不同源文件替换目标文件。</td>
    </tr>

<tr class="a">

<td><tt>-append</tt> </td>

<td>同名而不同长度的文件进行增量拷贝 </td>

<td>如果源文件的长度比目标文件大，将会检验共同长度部分的校验和，如果校验和匹配, 使用read和append功能，仅将不同的部分拷贝。<tt>-append</tt> 与 <tt>-update</tt> 并不带 <tt>-skipcrccheck</tt></td>
    </tr>

<tr class="b">

<td><tt>-f &lt;urilist_uri&gt;</tt> </td>

<td>使用 &lt;urilist_uri&gt; 作为源文件列表 </td>

<td>意思就是把每个源文件名列在命令行中。 <tt>urilist_uri</tt> 列表应该是一个完整的、合法的URI。</td>
    </tr>

<tr class="a">

<td><tt>-filters</tt> </td>

<td>The path to a file containing a list of pattern strings, one string per line, such that paths matching the pattern will be excluded from the copy. </td>

<td>Support regular expressions specified by java.util.regex.Pattern.</td>
    </tr>

<tr class="b">

<td><tt>-filelimit &lt;n&gt;</tt> </td>

<td>Limit the total number of files to be &lt;= n </td>

<td><b>Deprecated!</b> Ignored in the new DistCp.</td>
    </tr>

<tr class="a">

<td><tt>-sizelimit &lt;n&gt;</tt> </td>

<td>Limit the total size to be &lt;= n bytes </td>

<td><b>Deprecated!</b> Ignored in the new DistCp.</td>
    </tr>

<tr class="b">

<td><tt>-delete</tt> </td>

<td>Delete the files existing in the dst but not in src </td>

<td>The deletion is done by FS Shell. So the trash will be used, if it is enable. Delete is applicable only with update or overwrite options.</td>
    </tr>

<tr class="a">

<td><tt>-strategy {dynamic|uniformsize}</tt> </td>

<td>选择DistCp的拷贝策略</td>

<td>默认使用uniformsize。 (即，根据拷贝文件的总大小平衡每个map，与传统相仿。) 如果指定 “dynamic”， 将会使用<tt>DynamicInputFormat</tt>。 (详见体系架构)</td>
    </tr>

<tr class="b">

<td><tt>-bandwidth</tt> </td>

<td>给每个map指定带宽, in MB/second. </td>

<td>Each map will be restricted to consume only the specified bandwidth. This is not always exact. The map throttles back its bandwidth consumption during a copy, such that the <b>net</b> bandwidth used tends towards the specified value.</td>
    </tr>

<tr class="a">

<td><tt>-atomic {-tmp &lt;tmp_dir&gt;}</tt> </td>

<td>Specify atomic commit, with optional tmp directory. </td>

<td><tt>-atomic</tt> instructs DistCp to copy the source data to a temporary target location, and then move the temporary target to the final-location atomically. Data will either be available at final target in a complete and consistent form, or not at all. Optionally, <tt>-tmp</tt> may be used to specify the location of the tmp-target. If not specified, a default is chosen. <b>Note:</b> tmp_dir must be on the final target cluster.</td>
    </tr>

<tr class="b">

<td><tt>-mapredSslConf &lt;ssl_conf_file&gt;</tt> </td>

<td>Specify SSL Config file, to be used with HSFTP source </td>

<td>When using the hsftp protocol with a source, the security- related properties may be specified in a config-file and passed to DistCp. &lt;ssl_conf_file&gt; needs to be in the classpath.</td>
    </tr>

<tr class="a">

<td><tt>-async</tt> </td>

<td>异步执行DistCp. Hadoop任务一运行立即返回。 </td>

<td>记录Hadoop任务id， 以便追踪。</td>
    </tr>

<tr class="b">

<td><tt>-diff &lt;oldSnapshot&gt; &lt;newSnapshot&gt;</tt> </td>

<td>Use snapshot diff report between given two snapshots to identify the difference between source and target, and apply the diff to the target to make it in sync with source. </td>

<td>This option is valid only with <tt>-update</tt> option and the following conditions should be satisfied. 
<ol style="list-style-type: decimal">
<li> Both the source and the target FileSystem must be DistributedFileSystem.</li> 
<li> Two snapshots <tt>&lt;oldSnapshot&gt;</tt> and <tt>&lt;newSnapshot&gt;</tt> have been created on the source FS, and <tt>&lt;oldSnapshot&gt;</tt> is older than <tt>&lt;newSnapshot&gt;</tt>. </li> 
<li> The target has the same snapshot <tt>&lt;oldSnapshot&gt;</tt>. No changes have been made on the target since <tt>&lt;oldSnapshot&gt;</tt> was created, thus <tt>&lt;oldSnapshot&gt;</tt> has the same content as the current state of the target. All the files/directories in the target are the same with source’s <tt>&lt;oldSnapshot&gt;</tt>.</li></ol> </td>
    </tr>

<tr class="a">

<td><tt>-rdiff &lt;newSnapshot&gt; &lt;oldSnapshot&gt;</tt> </td>

<td>Use snapshot diff report between given two snapshots to identify what has been changed on the target since the snapshot <tt>&lt;oldSnapshot&gt;</tt> was created on the target, and apply the diff reversely to the target, and copy modified files from the source’s <tt>&lt;oldSnapshot&gt;</tt>, to make the target the same as <tt>&lt;oldSnapshot&gt;</tt>. </td>

<td>This option is valid only with <tt>-update</tt> option and the following conditions should be satisfied. 
<ol style="list-style-type: decimal">
<li>Both the source and the target FileSystem must be DistributedFileSystem. The source and the target can be two different clusters/paths, or they can be exactly the same cluster/path. In the latter case, modified files are copied from target’s <tt>&lt;oldSnapshot&gt;</tt> to target’s current state).</li> 
<li> Two snapshots <tt>&lt;newSnapshot&gt;</tt> and <tt>&lt;oldSnapshot&gt;</tt> have been created on the target FS, and <tt>&lt;oldSnapshot&gt;</tt> is older than <tt>&lt;newSnapshot&gt;</tt>. No change has been made on target since <tt>&lt;newSnapshot&gt;</tt> was created on the target. </li> 
<li> The source has the same snapshot <tt>&lt;oldSnapshot&gt;</tt>, which has the same content as the <tt>&lt;oldSnapshot&gt;</tt> on the target. All the files/directories in the target’s <tt>&lt;oldSnapshot&gt;</tt> are the same with source’s <tt>&lt;oldSnapshot&gt;</tt>.</li> </ol> </td>
    </tr>

<tr class="b">

<td><tt>-numListstatusThreads</tt> </td>

<td>Number of threads to use for building file listing </td>

<td>At most 40 threads.</td>
    </tr>

<tr class="a">

<td><tt>-skipcrccheck</tt> </td>

<td>是否跳过源路径和目标路径之间的 crc 检查。 </td>
    </tr>

<tr class="b">

<td><tt>-blocksperchunk &lt;blocksperchunk&gt;</tt> </td>

<td>每个chunk的块数量，指定后，将文件分割成多个块并行复制。 </td>

<td>If set to a positive value, files with more blocks than this value will be split into chunks of <tt>&lt;blocksperchunk&gt;</tt> blocks to be transferred in parallel, and reassembled on the destination. By default, <tt>&lt;blocksperchunk&gt;</tt> is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. </td>
    </tr>

<tr class="a">

<td><tt>-copybuffersize &lt;copybuffersize&gt;</tt> </td>

<td>Size of the copy buffer to use. By default, <tt>&lt;copybuffersize&gt;</tt> is set to 8192B </td>
    </tr>
</table>

<hr>
<h1 id="参考资料"><a href="#参考资料" class="headerlink" title="参考资料"></a>参考资料</h1><ul>
<li><a href="https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html" title="Hadoop参考文档" target="_blank" rel="noopener">Hadoop参考文档</a></li>
<li><a href="https://github.com/apache/hadoop/archive/rel/release-2.7.3.zip" title="release-2.7.3.zip" target="_blank" rel="noopener">Hadoop-2.7.3源码</a></li>
</ul>

      
    </div>
    
    
    

    

    

    

    <footer class="post-footer">
      

      
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
              <a href="/2019/02/25/cryptic_interceptor/" rel="next" title="使用拦截器进行数据加解密">
                <i class="fa fa-chevron-left"></i> 使用拦截器进行数据加解密
              </a>
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/2019/02/25/docker_elk/" rel="prev" title="基于docker搭建ELK系统">
                基于docker搭建ELK系统 <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>



    <div class="post-spread">
      
    </div>
  </div>


          </div>
          


          

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <section class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope itemtype="http://schema.org/Person">
            
              <p class="site-author-name" itemprop="name">yampery</p>
              <p class="site-description motion-element" itemprop="description"></p>
          </div>

          <nav class="site-state motion-element">

            
              <div class="site-state-item site-state-posts">
              
                <a href="/archives/">
              
                  <span class="site-state-item-count">6</span>
                  <span class="site-state-item-name">日志</span>
                </a>
              </div>
            

            

            

          </nav>

          
            <div class="feed-link motion-element">
              <a href="/atom.xml" rel="alternate">
                <i class="fa fa-rss"></i>
                RSS
              </a>
            </div>
          

          
            <div class="links-of-author motion-element">
                
                  <span class="links-of-author-item">
                    <a href="https://github.com/yampery" target="_blank" title="GitHub">
                      
                        <i class="fa fa-fw fa-github"></i>GitHub</a>
                  </span>
                
                  <span class="links-of-author-item">
                    <a href="mailto:1yampery@gmail.com" target="_blank" title="E-Mail">
                      
                        <i class="fa fa-fw fa-envelope"></i>E-Mail</a>
                  </span>
                
            </div>
          

          
          

          
          

          

        </div>
      </section>

      
      <!--noindex-->
        <section class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
              
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-1"><a class="nav-link" href="#distcp使用"><span class="nav-number">1.</span> <span class="nav-text">distcp使用</span></a><ol class="nav-child"><li class="nav-item nav-level-2"><a class="nav-link" href="#基本使用"><span class="nav-number">1.1.</span> <span class="nav-text">基本使用</span></a></li><li class="nav-item nav-level-2"><a class="nav-link" href="#Update-和-Overwrite"><span class="nav-number">1.2.</span> <span class="nav-text">Update 和 Overwrite</span></a></li><li class="nav-item nav-level-2"><a class="nav-link" href="#DistCp体系结构"><span class="nav-number">1.3.</span> <span class="nav-text">DistCp体系结构</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#DistCp-Driver"><span class="nav-number">1.3.1.</span> <span class="nav-text">DistCp Driver</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#InputFormats-和-MapReduce-组件"><span class="nav-number">1.3.2.</span> <span class="nav-text">InputFormats 和 MapReduce 组件</span></a></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#Map-sizing"><span class="nav-number">1.4.</span> <span class="nav-text">Map sizing</span></a></li><li class="nav-item nav-level-2"><a class="nav-link" href="#不同版本间拷贝"><span class="nav-number">1.5.</span> <span class="nav-text">不同版本间拷贝</span></a></li><li class="nav-item nav-level-2"><a class="nav-link" href="#MapReduce-和-其他副作用"><span class="nav-number">1.6.</span> <span class="nav-text">MapReduce 和 其他副作用</span></a></li></ol></li><li class="nav-item nav-level-1"><a class="nav-link" href="#从源码理解DistCp"><span class="nav-number">2.</span> <span class="nav-text">从源码理解DistCp</span></a><ol class="nav-child"><li class="nav-item nav-level-2"><a class="nav-link" href="#DistCp"><span class="nav-number">2.1.</span> <span class="nav-text">DistCp</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#metaFolder"><span class="nav-number">2.1.1.</span> <span class="nav-text">metaFolder</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#fileList-seq文件"><span class="nav-number">2.1.2.</span> <span class="nav-text">fileList.seq文件</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#Job"><span class="nav-number">2.1.3.</span> <span class="nav-text">Job</span></a></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#InputFormat"><span class="nav-number">2.2.</span> <span class="nav-text">InputFormat</span></a></li><li class="nav-item nav-level-2"><a class="nav-link" href="#CopyMapper"><span class="nav-number">2.3.</span> <span class="nav-text">CopyMapper</span></a></li></ol></li><li class="nav-item nav-level-1"><a class="nav-link" href="#命令参数"><span class="nav-number">3.</span> <span class="nav-text">命令参数</span></a></li><li class="nav-item nav-level-1"><a class="nav-link" href="#参考资料"><span class="nav-number">4.</span> <span class="nav-text">参考资料</span></a></li></ol></div>
            

          </div>
        </section>
      <!--/noindex-->
      

      

    </div>
  </aside>


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">&copy; 2018 &mdash; <span itemprop="copyrightYear">2019</span>
  <span class="with-love">
    <i class="fa fa-user"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">yampery</span>

  
</div>


  <div class="powered-by">由 <a class="theme-link" target="_blank" href="https://hexo.io" rel="external nofollow">Hexo</a> 强力驱动</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 &mdash; <a class="theme-link" target="_blank" href="https://github.com/iissnan/hexo-theme-next" rel="external nofollow">NexT.Mist</a> v5.1.4</div>




        
<div class="busuanzi-count">
  <script async src="https://busuanzi.ibruce.info/busuanzi/2.3/busuanzi.pure.mini.js"></script>

  
    <span class="site-uv">
      <i class="fa fa-user"></i> 本站访客数
      <span class="busuanzi-value" id="busuanzi_value_site_uv"></span>
      人次
    </span>
  

  
    <span class="site-pv">
      <i class="fa fa-eye"></i> 本站总访问量
      <span class="busuanzi-value" id="busuanzi_value_site_pv"></span>
      次
    </span>
  
</div>








        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
      </div>
    

    

  </div>

  

<script type="text/javascript">
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>









  












  
  
    <script type="text/javascript" src="/lib/jquery/index.js?v=2.1.3"></script>
  

  
  
    <script type="text/javascript" src="/lib/fastclick/lib/fastclick.min.js?v=1.0.6"></script>
  

  
  
    <script type="text/javascript" src="/lib/jquery_lazyload/jquery.lazyload.js?v=1.9.7"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/fancybox/source/jquery.fancybox.pack.js?v=2.1.5"></script>
  


  


  <script type="text/javascript" src="/js/src/utils.js?v=5.1.4"></script>

  <script type="text/javascript" src="/js/src/motion.js?v=5.1.4"></script>



  
  

  
  <script type="text/javascript" src="/js/src/scrollspy.js?v=5.1.4"></script>
<script type="text/javascript" src="/js/src/post-details.js?v=5.1.4"></script>



  


  <script type="text/javascript" src="/js/src/bootstrap.js?v=5.1.4"></script>



  


  




	





  





  












  





  

  
  <script src="https://cdn1.lncld.net/static/js/av-core-mini-0.6.4.js"></script>
  <script>AV.initialize("35rGUMRzp5wPN2jmmIUWeBNh-gzGzoHsz", "fmF6mJDmgl0FkqBKX8I2HiGF");</script>
  <script>
    function showTime(Counter) {
      var query = new AV.Query(Counter);
      var entries = [];
      var $visitors = $(".leancloud_visitors");

      $visitors.each(function () {
        entries.push( $(this).attr("id").trim() );
      });

      query.containedIn('url', entries);
      query.find()
        .done(function (results) {
          var COUNT_CONTAINER_REF = '.leancloud-visitors-count';

          if (results.length === 0) {
            $visitors.find(COUNT_CONTAINER_REF).text(0);
            return;
          }

          for (var i = 0; i < results.length; i++) {
            var item = results[i];
            var url = item.get('url');
            var time = item.get('time');
            var element = document.getElementById(url);

            $(element).find(COUNT_CONTAINER_REF).text(time);
          }
          for(var i = 0; i < entries.length; i++) {
            var url = entries[i];
            var element = document.getElementById(url);
            var countSpan = $(element).find(COUNT_CONTAINER_REF);
            if( countSpan.text() == '') {
              countSpan.text(0);
            }
          }
        })
        .fail(function (object, error) {
          console.log("Error: " + error.code + " " + error.message);
        });
    }

    function addCount(Counter) {
      var $visitors = $(".leancloud_visitors");
      var url = $visitors.attr('id').trim();
      var title = $visitors.attr('data-flag-title').trim();
      var query = new AV.Query(Counter);

      query.equalTo("url", url);
      query.find({
        success: function(results) {
          if (results.length > 0) {
            var counter = results[0];
            counter.fetchWhenSave(true);
            counter.increment("time");
            counter.save(null, {
              success: function(counter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(counter.get('time'));
              },
              error: function(counter, error) {
                console.log('Failed to save Visitor num, with error message: ' + error.message);
              }
            });
          } else {
            var newcounter = new Counter();
            /* Set ACL */
            var acl = new AV.ACL();
            acl.setPublicReadAccess(true);
            acl.setPublicWriteAccess(true);
            newcounter.setACL(acl);
            /* End Set ACL */
            newcounter.set("title", title);
            newcounter.set("url", url);
            newcounter.set("time", 1);
            newcounter.save(null, {
              success: function(newcounter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(newcounter.get('time'));
              },
              error: function(newcounter, error) {
                console.log('Failed to create');
              }
            });
          }
        },
        error: function(error) {
          console.log('Error:' + error.code + " " + error.message);
        }
      });
    }

    $(function() {
      var Counter = AV.Object.extend("Counter");
      if ($('.leancloud_visitors').length == 1) {
        addCount(Counter);
      } else if ($('.post-title-link').length > 1) {
        showTime(Counter);
      }
    });
  </script>



  

  

  
  

  

  

  

</body>
</html>
